/***
 * Copyright (c) 2021-2031 murenchao
 * fig is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *       http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
package cool.taomu.software.fig.mqtt.broker.impl.request

import cool.taomu.software.fig.classloader.FigClassLoaderManage.Autowired
import cool.taomu.software.fig.mqtt.broker.entity.PublishEntity
import cool.taomu.software.fig.mqtt.broker.entity.SubAckEntity
import cool.taomu.software.fig.mqtt.broker.entity.TopicEntity
import cool.taomu.software.fig.mqtt.broker.impl.Publish
import cool.taomu.software.fig.mqtt.broker.impl.PublishObservable
import cool.taomu.software.fig.mqtt.broker.impl.response.MQTTSubAck
import cool.taomu.software.fig.mqtt.broker.inter.IPublishObserver.Type
import cool.taomu.software.fig.mqtt.broker.inter.IRequest
import cool.taomu.software.fig.mqtt.broker.inter.IResponse
import cool.taomu.software.fig.mqtt.utils.CommonUtils
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.mqtt.MqttMessage
import io.netty.handler.codec.mqtt.MqttSubscribeMessage
import io.netty.handler.codec.mqtt.MqttTopicSubscription
import java.util.ArrayList
import java.util.List
import org.apache.oro.text.perl.Perl5Util
import org.slf4j.LoggerFactory

class MQTTSubscribe implements IRequest {

    val LOG = LoggerFactory.getLogger(MQTTConnect);

    @Autowired(MQTTSubAck)
    IResponse<SubAckEntity> response

    @Autowired(cool.taomu.software.fig.mqtt.broker.impl.response.MQTTPublish)
    IResponse<PublishEntity> publishResp;

    override request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
        var clientId = CommonUtils.getClientId(ctx.channel);
        if (clientId === null) {
            // 为perl mqtt客户端改进代码
            Thread.sleep(100)
            clientId = CommonUtils.getClientId(ctx.channel);
        }
        LOG.info("执行了MQTT Subscribe 命令 : " + clientId);
        var MqttSubscribeMessage subscribeMessage = mqttMessage as MqttSubscribeMessage;
        var int messageId = subscribeMessage.variableHeader().messageId();
        var validTopicList = registerTopics(ctx, subscribeMessage.payload().topicSubscriptions());
        synchronized (validTopicList) {
            if (validTopicList === null || validTopicList.size() == 0) {
                LOG.info(String.format("Valid all subscribe topic failure,messageId:%s", messageId));
                return null;
            }

            var entity = new SubAckEntity(messageId, validTopicList);
            LOG.info(entity.toString());
            // 推送用户订阅
            PublishObservable.instance.start(clientId, Type.RETAIN)
            return #[response.response(entity)];
        }
    }

    def synchronized registerTopics(ChannelHandlerContext ctx, List<MqttTopicSubscription> topics) {
        synchronized (ctx) {
            var clientId = CommonUtils.getClientId(ctx.channel);
            var topicList = new ArrayList<TopicEntity>();
            for (MqttTopicSubscription subscription : topics) {
                val topic = new TopicEntity(subscription.topicName(), subscription.qualityOfService().value());
                topic.clientId = clientId;
                var p5 = new Perl5Util();
                if (p5.match("/^[A-Za-z0-9]+([\\/A-Za-z0-9]*|\\/\\+||\\/\\#)$/", topic.name)) {
                    var publish = new Publish(topic, this.publishResp);
                    PublishObservable.instance.register(clientId, publish);
                }
                topicList.add(topic);
            }
            return topicList;
        }
    }

}
